EMMA Coverage Report (generated Thu May 22 12:08:10 CDT 2014)
[all classes][org.springframework.batch.core.step.tasklet]

COVERAGE SUMMARY FOR SOURCE FILE [TaskletStep.java]

nameclass, %method, %block, %line, %
TaskletStep.java100% (5/5)100% (36/36)94%  (591/630)95%  (137.3/144)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class TaskletStep$ChunkTransactionCallback100% (1/1)100% (5/5)90%  (346/384)92%  (79.4/86)
afterCompletion (int): void 100% (1/1)82%  (55/67)90%  (15.4/17)
doInTransaction (TransactionStatus): Object 100% (1/1)90%  (232/258)90%  (46/51)
TaskletStep$ChunkTransactionCallback (TaskletStep, ChunkContext, Semaphore): ... 100% (1/1)100% (26/26)100% (8/8)
copy (StepExecution, StepExecution): void 100% (1/1)100% (24/24)100% (6/6)
rollback (StepExecution): void 100% (1/1)100% (9/9)100% (4/4)
     
class TaskletStep100% (1/1)100% (26/26)99%  (176/177)100% (45.9/46)
afterPropertiesSet (): void 100% (1/1)91%  (10/11)97%  (2.9/3)
<static initializer> 100% (1/1)100% (4/4)100% (1/1)
TaskletStep (): void 100% (1/1)100% (4/4)100% (2/2)
TaskletStep (String): void 100% (1/1)100% (30/30)100% (7/7)
access$000 (TaskletStep): StepInterruptionPolicy 100% (1/1)100% (3/3)100% (1/1)
access$100 (TaskletStep): PlatformTransactionManager 100% (1/1)100% (3/3)100% (1/1)
access$200 (TaskletStep): TransactionAttribute 100% (1/1)100% (3/3)100% (1/1)
access$300 (TaskletStep): CompositeChunkListener 100% (1/1)100% (3/3)100% (1/1)
access$400 (): Log 100% (1/1)100% (2/2)100% (1/1)
access$500 (TaskletStep): Tasklet 100% (1/1)100% (3/3)100% (1/1)
access$600 (TaskletStep): CompositeItemStream 100% (1/1)100% (3/3)100% (1/1)
access$700 (TaskletStep): JobRepository 100% (1/1)100% (3/3)100% (1/1)
access$800 (TaskletStep): JobRepository 100% (1/1)100% (3/3)100% (1/1)
close (ExecutionContext): void 100% (1/1)100% (4/4)100% (2/2)
createSemaphore (): Semaphore 100% (1/1)100% (5/5)100% (1/1)
doExecute (StepExecution): void 100% (1/1)100% (23/23)100% (5/5)
open (ExecutionContext): void 100% (1/1)100% (5/5)100% (2/2)
registerChunkListener (ChunkListener): void 100% (1/1)100% (5/5)100% (2/2)
registerStream (ItemStream): void 100% (1/1)100% (5/5)100% (2/2)
setChunkListeners (ChunkListener []): void 100% (1/1)100% (14/14)100% (3/3)
setInterruptionPolicy (StepInterruptionPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setStepOperations (RepeatOperations): void 100% (1/1)100% (4/4)100% (2/2)
setStreams (ItemStream []): void 100% (1/1)100% (14/14)100% (3/3)
setTasklet (Tasklet): void 100% (1/1)100% (11/11)100% (4/4)
setTransactionAttribute (TransactionAttribute): void 100% (1/1)100% (4/4)100% (2/2)
setTransactionManager (PlatformTransactionManager): void 100% (1/1)100% (4/4)100% (2/2)
     
class TaskletStep$1100% (1/1)100% (2/2)100% (8/8)100% (2/2)
TaskletStep$1 (TaskletStep): void 100% (1/1)100% (6/6)100% (1/1)
rollbackOn (Throwable): boolean 100% (1/1)100% (2/2)100% (1/1)
     
class TaskletStep$2100% (1/1)100% (2/2)100% (57/57)100% (10/10)
TaskletStep$2 (TaskletStep, StepExecution, Semaphore): void 100% (1/1)100% (10/10)100% (1/1)
doInChunkContext (RepeatContext, ChunkContext): RepeatStatus 100% (1/1)100% (47/47)100% (9/9)
     
class TaskletStep$UncheckedTransactionException100% (1/1)100% (1/1)100% (4/4)100% (2/2)
TaskletStep$UncheckedTransactionException (Exception): void 100% (1/1)100% (4/4)100% (2/2)

1/*
2 * Copyright 2006-2013 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.springframework.batch.core.step.tasklet;
17 
18import java.util.concurrent.Semaphore;
19 
20import org.apache.commons.logging.Log;
21import org.apache.commons.logging.LogFactory;
22import org.springframework.batch.core.BatchStatus;
23import org.springframework.batch.core.ChunkListener;
24import org.springframework.batch.core.JobInterruptedException;
25import org.springframework.batch.core.StepContribution;
26import org.springframework.batch.core.StepExecution;
27import org.springframework.batch.core.StepExecutionListener;
28import org.springframework.batch.core.listener.CompositeChunkListener;
29import org.springframework.batch.core.repository.JobRepository;
30import org.springframework.batch.core.scope.context.ChunkContext;
31import org.springframework.batch.core.scope.context.StepContextRepeatCallback;
32import org.springframework.batch.core.step.AbstractStep;
33import org.springframework.batch.core.step.FatalStepExecutionException;
34import org.springframework.batch.core.step.StepInterruptionPolicy;
35import org.springframework.batch.core.step.ThreadStepInterruptionPolicy;
36import org.springframework.batch.item.ExecutionContext;
37import org.springframework.batch.item.ItemReader;
38import org.springframework.batch.item.ItemStream;
39import org.springframework.batch.item.ItemWriter;
40import org.springframework.batch.item.support.CompositeItemStream;
41import org.springframework.batch.repeat.RepeatContext;
42import org.springframework.batch.repeat.RepeatOperations;
43import org.springframework.batch.repeat.RepeatStatus;
44import org.springframework.batch.repeat.support.RepeatTemplate;
45import org.springframework.transaction.PlatformTransactionManager;
46import org.springframework.transaction.TransactionStatus;
47import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
48import org.springframework.transaction.interceptor.TransactionAttribute;
49import org.springframework.transaction.support.TransactionCallback;
50import org.springframework.transaction.support.TransactionSynchronization;
51import org.springframework.transaction.support.TransactionSynchronizationAdapter;
52import org.springframework.transaction.support.TransactionSynchronizationManager;
53import org.springframework.transaction.support.TransactionTemplate;
54import org.springframework.util.Assert;
55 
56/**
57 * Simple implementation of executing the step as a call to a {@link Tasklet},
58 * possibly repeated, and each call surrounded by a transaction. The structure
59 * is therefore that of a loop with transaction boundary inside the loop. The
60 * loop is controlled by the step operations (
61 * {@link #setStepOperations(RepeatOperations)}).<br/>
62 * <br/>
63 *
64 * Clients can use interceptors in the step operations to intercept or listen to
65 * the iteration on a step-wide basis, for instance to get a callback when the
66 * step is complete. Those that want callbacks at the level of an individual
67 * tasks, can specify interceptors for the chunk operations.
68 *
69 * @author Dave Syer
70 * @author Lucas Ward
71 * @author Ben Hale
72 * @author Robert Kasanicky
73 * @author Michael Minella
74 */
75@SuppressWarnings("serial")
76public class TaskletStep extends AbstractStep {
77 
78        private static final Log logger = LogFactory.getLog(TaskletStep.class);
79 
80        private RepeatOperations stepOperations = new RepeatTemplate();
81 
82        private CompositeChunkListener chunkListener = new CompositeChunkListener();
83 
84        // default to checking current thread for interruption.
85        private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy();
86 
87        private CompositeItemStream stream = new CompositeItemStream();
88 
89        private PlatformTransactionManager transactionManager;
90 
91        private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() {
92 
93                @Override
94                public boolean rollbackOn(Throwable ex) {
95                        return true;
96                }
97 
98        };
99 
100        private Tasklet tasklet;
101 
102        /**
103         * Default constructor.
104         */
105        public TaskletStep() {
106                this(null);
107        }
108 
109        /**
110         * @param name
111         */
112        public TaskletStep(String name) {
113                super(name);
114        }
115 
116        /*
117         * (non-Javadoc)
118         *
119         * @see
120         * org.springframework.batch.core.step.AbstractStep#afterPropertiesSet()
121         */
122        @Override
123        public void afterPropertiesSet() throws Exception {
124                super.afterPropertiesSet();
125                Assert.state(transactionManager != null, "A transaction manager must be provided");
126        }
127 
128        /**
129         * Public setter for the {@link PlatformTransactionManager}.
130         *
131         * @param transactionManager the transaction manager to set
132         */
133        public void setTransactionManager(PlatformTransactionManager transactionManager) {
134                this.transactionManager = transactionManager;
135        }
136 
137        /**
138         * Public setter for the {@link TransactionAttribute}.
139         *
140         * @param transactionAttribute the {@link TransactionAttribute} to set
141         */
142        public void setTransactionAttribute(TransactionAttribute transactionAttribute) {
143                this.transactionAttribute = transactionAttribute;
144        }
145 
146        /**
147         * Public setter for the {@link Tasklet}.
148         *
149         * @param tasklet the {@link Tasklet} to set
150         */
151        public void setTasklet(Tasklet tasklet) {
152                this.tasklet = tasklet;
153                if (tasklet instanceof StepExecutionListener) {
154                        registerStepExecutionListener((StepExecutionListener) tasklet);
155                }
156        }
157 
158        /**
159         * Register a chunk listener for callbacks at the appropriate stages in a
160         * step execution.
161         *
162         * @param listener a {@link ChunkListener}
163         */
164        public void registerChunkListener(ChunkListener listener) {
165                this.chunkListener.register(listener);
166        }
167 
168        /**
169         * Register each of the objects as listeners.
170         *
171         * @param listeners an array of listener objects of known types.
172         */
173        public void setChunkListeners(ChunkListener[] listeners) {
174                for (int i = 0; i < listeners.length; i++) {
175                        registerChunkListener(listeners[i]);
176                }
177        }
178 
179        /**
180         * Register each of the streams for callbacks at the appropriate time in the
181         * step. The {@link ItemReader} and {@link ItemWriter} are automatically
182         * registered, but it doesn't hurt to also register them here. Injected
183         * dependencies of the reader and writer are not automatically registered,
184         * so if you implement {@link ItemWriter} using delegation to another object
185         * which itself is a {@link ItemStream}, you need to register the delegate
186         * here.
187         *
188         * @param streams an array of {@link ItemStream} objects.
189         */
190        public void setStreams(ItemStream[] streams) {
191                for (int i = 0; i < streams.length; i++) {
192                        registerStream(streams[i]);
193                }
194        }
195 
196        /**
197         * Register a single {@link ItemStream} for callbacks to the stream
198         * interface.
199         *
200         * @param stream
201         */
202        public void registerStream(ItemStream stream) {
203                this.stream.register(stream);
204        }
205 
206        /**
207         * The {@link RepeatOperations} to use for the outer loop of the batch
208         * processing. Should be set up by the caller through a factory. Defaults to
209         * a plain {@link RepeatTemplate}.
210         *
211         * @param stepOperations a {@link RepeatOperations} instance.
212         */
213        public void setStepOperations(RepeatOperations stepOperations) {
214                this.stepOperations = stepOperations;
215        }
216 
217        /**
218         * Setter for the {@link StepInterruptionPolicy}. The policy is used to
219         * check whether an external request has been made to interrupt the job
220         * execution.
221         *
222         * @param interruptionPolicy a {@link StepInterruptionPolicy}
223         */
224        public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) {
225                this.interruptionPolicy = interruptionPolicy;
226        }
227 
228        /**
229         * Process the step and update its context so that progress can be monitored
230         * by the caller. The step is broken down into chunks, each one executing in
231         * a transaction. The step and its execution and execution context are all
232         * given an up to date {@link BatchStatus}, and the {@link JobRepository} is
233         * used to store the result. Various reporting information are also added to
234         * the current context governing the step execution, which would normally be
235         * available to the caller through the step's {@link ExecutionContext}.<br/>
236         *
237         * @throws JobInterruptedException if the step or a chunk is interrupted
238         * @throws RuntimeException if there is an exception during a chunk
239         * execution
240         *
241         */
242        @Override
243        @SuppressWarnings("unchecked")
244        protected void doExecute(StepExecution stepExecution) throws Exception {
245 
246                stream.update(stepExecution.getExecutionContext());
247                getJobRepository().updateExecutionContext(stepExecution);
248 
249                // Shared semaphore per step execution, so other step executions can run
250                // in parallel without needing the lock
251                final Semaphore semaphore = createSemaphore();
252 
253                stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
254 
255                        @Override
256                        public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
257                                        throws Exception {
258 
259                                StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
260 
261                                // Before starting a new transaction, check for
262                                // interruption.
263                                interruptionPolicy.checkInterrupted(stepExecution);
264 
265                                RepeatStatus result;
266                                try {
267                                        result = (RepeatStatus) new TransactionTemplate(transactionManager, transactionAttribute)
268                                        .execute(new ChunkTransactionCallback(chunkContext, semaphore));
269                                }
270                                catch (UncheckedTransactionException e) {
271                                        // Allow checked exceptions to be thrown inside callback
272                                        throw (Exception) e.getCause();
273                                }
274 
275                                chunkListener.afterChunk(chunkContext);
276 
277                                // Check for interruption after transaction as well, so that
278                                // the interrupted exception is correctly propagated up to
279                                // caller
280                                interruptionPolicy.checkInterrupted(stepExecution);
281 
282                                return result;
283                        }
284 
285                });
286 
287        }
288 
289        /**
290         * Extension point mainly for test purposes so that the behaviour of the
291         * lock can be manipulated to simulate various pathologies.
292         *
293         * @return a semaphore for locking access to the JobRepository
294         */
295        protected Semaphore createSemaphore() {
296                return new Semaphore(1);
297        }
298 
299        @Override
300        protected void close(ExecutionContext ctx) throws Exception {
301                stream.close();
302        }
303 
304        @Override
305        protected void open(ExecutionContext ctx) throws Exception {
306                stream.open(ctx);
307        }
308 
309        /**
310         * A callback for the transactional work inside a chunk. Also detects
311         * failures in the transaction commit and rollback, only panicking if the
312         * transaction status is unknown (i.e. if a commit failure leads to a clean
313         * rollback then we assume the state is consistent).
314         *
315         * @author Dave Syer
316         *
317         */
318        @SuppressWarnings("rawtypes")
319        private class ChunkTransactionCallback extends TransactionSynchronizationAdapter implements TransactionCallback {
320 
321                private final StepExecution stepExecution;
322 
323                private final ChunkContext chunkContext;
324 
325                private boolean rolledBack = false;
326 
327                private boolean stepExecutionUpdated = false;
328 
329                private StepExecution oldVersion;
330 
331                private boolean locked = false;
332 
333                private final Semaphore semaphore;
334 
335                public ChunkTransactionCallback(ChunkContext chunkContext, Semaphore semaphore) {
336                        this.chunkContext = chunkContext;
337                        this.stepExecution = chunkContext.getStepContext().getStepExecution();
338                        this.semaphore = semaphore;
339                }
340 
341                @Override
342                public void afterCompletion(int status) {
343                        try {
344                                if (status != TransactionSynchronization.STATUS_COMMITTED) {
345                                        if (stepExecutionUpdated) {
346                                                // Wah! the commit failed. We need to rescue the step
347                                                // execution data.
348                                                logger.info("Commit failed while step execution data was already updated. "
349                                                                + "Reverting to old version.");
350                                                copy(oldVersion, stepExecution);
351                                                if (status == TransactionSynchronization.STATUS_ROLLED_BACK) {
352                                                        rollback(stepExecution);
353                                                }
354                                        }
355                                        chunkListener.afterChunkError(chunkContext);
356                                }
357 
358                                if (status == TransactionSynchronization.STATUS_UNKNOWN) {
359                                        logger.error("Rolling back with transaction in unknown state");
360                                        rollback(stepExecution);
361                                        stepExecution.upgradeStatus(BatchStatus.UNKNOWN);
362                                        stepExecution.setTerminateOnly();
363                                }
364                        }
365                        finally {
366                                // Only release the lock if we acquired it, and release as late
367                                // as possible
368                                if (locked) {
369                                        semaphore.release();
370                                }
371 
372                                locked = false;
373                        }
374                }
375 
376                @Override
377                public Object doInTransaction(TransactionStatus status) {
378                        TransactionSynchronizationManager.registerSynchronization(this);
379 
380                        RepeatStatus result = RepeatStatus.CONTINUABLE;
381 
382                        StepContribution contribution = stepExecution.createStepContribution();
383 
384                        chunkListener.beforeChunk(chunkContext);
385 
386                        // In case we need to push it back to its old value
387                        // after a commit fails...
388                        oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
389                        copy(stepExecution, oldVersion);
390 
391                        try {
392 
393                                try {
394                                        try {
395                                                result = tasklet.execute(contribution, chunkContext);
396                                                if (result == null) {
397                                                        result = RepeatStatus.FINISHED;
398                                                }
399                                        }
400                                        catch (Exception e) {
401                                                if (transactionAttribute.rollbackOn(e)) {
402                                                        chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e);
403                                                        throw e;
404                                                }
405                                        }
406                                }
407                                finally {
408 
409                                        // If the step operations are asynchronous then we need
410                                        // to synchronize changes to the step execution (at a
411                                        // minimum). Take the lock *before* changing the step
412                                        // execution.
413                                        try {
414                                                semaphore.acquire();
415                                                locked = true;
416                                        }
417                                        catch (InterruptedException e) {
418                                                logger.error("Thread interrupted while locking for repository update");
419                                                stepExecution.setStatus(BatchStatus.STOPPED);
420                                                stepExecution.setTerminateOnly();
421                                                Thread.currentThread().interrupt();
422                                        }
423 
424                                        // Apply the contribution to the step
425                                        // even if unsuccessful
426                                        logger.debug("Applying contribution: " + contribution);
427                                        stepExecution.apply(contribution);
428 
429                                }
430 
431                                stepExecutionUpdated = true;
432 
433                                stream.update(stepExecution.getExecutionContext());
434 
435                                try {
436                                        // Going to attempt a commit. If it fails this flag will
437                                        // stay false and we can use that later.
438                                        getJobRepository().updateExecutionContext(stepExecution);
439                                        stepExecution.incrementCommitCount();
440                                        logger.debug("Saving step execution before commit: " + stepExecution);
441                                        getJobRepository().update(stepExecution);
442                                }
443                                catch (Exception e) {
444                                        // If we get to here there was a problem saving the step
445                                        // execution and we have to fail.
446                                        String msg = "JobRepository failure forcing rollback";
447                                        logger.error(msg, e);
448                                        throw new FatalStepExecutionException(msg, e);
449                                }
450                        }
451                        catch (Error e) {
452                                logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage());
453                                rollback(stepExecution);
454                                throw e;
455                        }
456                        catch (RuntimeException e) {
457                                logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage());
458                                rollback(stepExecution);
459                                throw e;
460                        }
461                        catch (Exception e) {
462                                logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage());
463                                rollback(stepExecution);
464                                // Allow checked exceptions
465                                throw new UncheckedTransactionException(e);
466                        }
467 
468                        return result;
469 
470                }
471 
472                private void rollback(StepExecution stepExecution) {
473                        if (!rolledBack) {
474                                stepExecution.incrementRollbackCount();
475                                rolledBack = true;
476                        }
477                }
478 
479                private void copy(final StepExecution source, final StepExecution target) {
480                        target.setVersion(source.getVersion());
481                        target.setWriteCount(source.getWriteCount());
482                        target.setFilterCount(source.getFilterCount());
483                        target.setCommitCount(source.getCommitCount());
484                        target.setExecutionContext(new ExecutionContext(source.getExecutionContext()));
485                }
486 
487        }
488 
489        /**
490         * Convenience wrapper for a checked exception so that it can cause a
491         * rollback and be extracted afterwards.
492         *
493         * @author Dave Syer
494         *
495         */
496        private static class UncheckedTransactionException extends RuntimeException {
497 
498                public UncheckedTransactionException(Exception e) {
499                        super(e);
500                }
501 
502        }
503 
504}

[all classes][org.springframework.batch.core.step.tasklet]
EMMA 2.0.5312 (C) Vladimir Roubtsov