EMMA Coverage Report (generated Fri Jan 30 13:20:29 EST 2009)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [ItemOrientedStep.java]

nameclass, %method, %block, %line, %
ItemOrientedStep.java100% (5/5)93%  (25/27)93%  (477/512)93%  (124.2/134)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ItemOrientedStep$1100% (1/1)50%  (1/2)75%  (6/8)50%  (1/2)
rollbackOn (Throwable): boolean 0%   (0/1)0%   (0/2)0%   (0/1)
ItemOrientedStep$1 (ItemOrientedStep): void 100% (1/1)100% (6/6)100% (1/1)
     
class ItemOrientedStep$ExceptionHolder100% (1/1)75%  (3/4)82%  (14/17)80%  (4/5)
getException (): Exception 0%   (0/1)0%   (0/3)0%   (0/1)
ItemOrientedStep$ExceptionHolder (): void 100% (1/1)100% (3/3)100% (1/1)
hasException (): boolean 100% (1/1)100% (7/7)100% (1/1)
setException (Exception): void 100% (1/1)100% (4/4)100% (2/2)
     
class ItemOrientedStep$2100% (1/1)100% (2/2)89%  (220/247)89%  (56.2/63)
doInIteration (RepeatContext): ExitStatus 100% (1/1)89%  (208/235)89%  (55.2/62)
ItemOrientedStep$2 (ItemOrientedStep, StepExecution, ItemOrientedStep$Excepti... 100% (1/1)100% (12/12)100% (1/1)
     
class ItemOrientedStep$3100% (1/1)100% (2/2)95%  (39/41)88%  (7/8)
doInIteration (RepeatContext): ExitStatus 100% (1/1)93%  (27/29)86%  (6/7)
ItemOrientedStep$3 (ItemOrientedStep, StepExecution, StepContribution): void 100% (1/1)100% (12/12)100% (1/1)
     
class ItemOrientedStep100% (1/1)100% (17/17)99%  (198/199)100% (55.9/56)
<static initializer> 100% (1/1)91%  (10/11)90%  (0.9/1)
ItemOrientedStep (String): void 100% (1/1)100% (36/36)100% (8/8)
close (ExecutionContext): void 100% (1/1)100% (5/5)100% (2/2)
doExecute (StepExecution): ExitStatus 100% (1/1)100% (27/27)100% (5/5)
open (ExecutionContext): void 100% (1/1)100% (5/5)100% (2/2)
processChunk (StepExecution, StepContribution): ExitStatus 100% (1/1)100% (12/12)100% (2/2)
processRollback (StepExecution, StepContribution, ItemOrientedStep$ExceptionH... 100% (1/1)100% (42/42)100% (14/14)
registerStream (ItemStream): void 100% (1/1)100% (5/5)100% (2/2)
setChunkOperations (RepeatOperations): void 100% (1/1)100% (4/4)100% (2/2)
setInterruptionPolicy (StepInterruptionPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setItemHandler (ItemHandler): void 100% (1/1)100% (4/4)100% (2/2)
setStepExecutionListeners (StepExecutionListener []): void 100% (1/1)100% (14/14)100% (3/3)
setStepOperations (RepeatOperations): void 100% (1/1)100% (4/4)100% (2/2)
setStreams (ItemStream []): void 100% (1/1)100% (14/14)100% (3/3)
setSynchronizer (StepExecutionSynchronizer): void 100% (1/1)100% (4/4)100% (2/2)
setTransactionAttribute (TransactionAttribute): void 100% (1/1)100% (4/4)100% (2/2)
setTransactionManager (PlatformTransactionManager): void 100% (1/1)100% (4/4)100% (2/2)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.springframework.batch.core.step.item;
17 
18import org.apache.commons.logging.Log;
19import org.apache.commons.logging.LogFactory;
20import org.springframework.batch.core.BatchStatus;
21import org.springframework.batch.core.JobInterruptedException;
22import org.springframework.batch.core.StepContribution;
23import org.springframework.batch.core.StepExecution;
24import org.springframework.batch.core.StepExecutionListener;
25import org.springframework.batch.core.repository.JobRepository;
26import org.springframework.batch.core.step.AbstractStep;
27import org.springframework.batch.core.step.StepExecutionSynchronizer;
28import org.springframework.batch.core.step.StepExecutionSynchronizerFactory;
29import org.springframework.batch.core.step.StepInterruptionPolicy;
30import org.springframework.batch.core.step.ThreadStepInterruptionPolicy;
31import org.springframework.batch.item.ExecutionContext;
32import org.springframework.batch.item.ItemReader;
33import org.springframework.batch.item.ItemStream;
34import org.springframework.batch.item.ItemWriter;
35import org.springframework.batch.item.support.CompositeItemStream;
36import org.springframework.batch.repeat.ExitStatus;
37import org.springframework.batch.repeat.RepeatCallback;
38import org.springframework.batch.repeat.RepeatContext;
39import org.springframework.batch.repeat.RepeatOperations;
40import org.springframework.batch.repeat.support.RepeatTemplate;
41import org.springframework.transaction.PlatformTransactionManager;
42import org.springframework.transaction.TransactionStatus;
43import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
44import org.springframework.transaction.interceptor.TransactionAttribute;
45 
46/**
47 * Simple implementation of executing the step as a set of chunks, each chunk
48 * surrounded by a transaction. The structure is therefore that of two nested
49 * loops, with transaction boundary around the whole inner loop. The outer loop
50 * is controlled by the step operations (
51 * {@link #setStepOperations(RepeatOperations)}), and the inner loop by the
52 * chunk operations ({@link #setChunkOperations(RepeatOperations)}). The inner
53 * loop should always be executed in a single thread, so the chunk operations
54 * should not do any concurrent execution. N.B. usually that means that the
55 * chunk operations should be a {@link RepeatTemplate} (which is the default).<br/>
56 * 
57 * Clients can use interceptors in the step operations to intercept or listen to
58 * the iteration on a step-wide basis, for instance to get a callback when the
59 * step is complete. Those that want callbacks at the level of an individual
60 * tasks, can specify interceptors for the chunk operations.
61 * 
62 * @author Dave Syer
63 * @author Lucas Ward
64 * @author Ben Hale
65 * @author Robert Kasanicky
66 */
67public class ItemOrientedStep extends AbstractStep {
68 
69        private static final Log logger = LogFactory.getLog(ItemOrientedStep.class);
70 
71        private RepeatOperations chunkOperations = new RepeatTemplate();
72 
73        private RepeatOperations stepOperations = new RepeatTemplate();
74 
75        // default to checking current thread for interruption.
76        private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy();
77 
78        private CompositeItemStream stream = new CompositeItemStream();
79 
80        private PlatformTransactionManager transactionManager;
81 
82        private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() {
83 
84                public boolean rollbackOn(Throwable ex) {
85                        return true;
86                }
87 
88        };
89 
90        private ItemHandler itemHandler;
91 
92        private StepExecutionSynchronizer synchronizer;
93 
94        /**
95         * @param name
96         */
97        public ItemOrientedStep(String name) {
98                super(name);
99                synchronizer = new StepExecutionSynchronizerFactory().getStepExecutionSynchronizer();
100        }
101 
102        /**
103         * Public setter for the {@link PlatformTransactionManager}.
104         * 
105         * @param transactionManager the transaction manager to set
106         */
107        public void setTransactionManager(PlatformTransactionManager transactionManager) {
108                this.transactionManager = transactionManager;
109        }
110 
111        /**
112         * Public setter for the {@link TransactionAttribute}.
113         * @param transactionAttribute the {@link TransactionAttribute} to set
114         */
115        public void setTransactionAttribute(TransactionAttribute transactionAttribute) {
116                this.transactionAttribute = transactionAttribute;
117        }
118 
119        /**
120         * Public setter for the {@link ItemHandler}.
121         * 
122         * @param itemHandler the {@link ItemHandler} to set
123         */
124        public void setItemHandler(ItemHandler itemHandler) {
125                this.itemHandler = itemHandler;
126        }
127 
128        /**
129         * Register each of the objects as listeners. If the {@link ItemReader} or
130         * {@link ItemWriter} themselves implements this interface they will be
131         * registered automatically, but their injected dependencies will not be.
132         * This is a good way to get access to job parameters and execution context
133         * if the tasklet is parameterised.
134         * 
135         * @param listeners an array of listener objects of known types.
136         */
137        public void setStepExecutionListeners(StepExecutionListener[] listeners) {
138                for (int i = 0; i < listeners.length; i++) {
139                        registerStepExecutionListener(listeners[i]);
140                }
141        }
142 
143        /**
144         * Register each of the streams for callbacks at the appropriate time in the
145         * step. The {@link ItemReader} and {@link ItemWriter} are automatically
146         * registered, but it doesn't hurt to also register them here. Injected
147         * dependencies of the reader and writer are not automatically registered,
148         * so if you implement {@link ItemWriter} using delegation to another object
149         * which itself is a {@link ItemStream}, you need to register the delegate
150         * here.
151         * 
152         * @param streams an array of {@link ItemStream} objects.
153         */
154        public void setStreams(ItemStream[] streams) {
155                for (int i = 0; i < streams.length; i++) {
156                        registerStream(streams[i]);
157                }
158        }
159 
160        /**
161         * Register a single {@link ItemStream} for callbacks to the stream
162         * interface.
163         * 
164         * @param stream
165         */
166        public void registerStream(ItemStream stream) {
167                this.stream.register(stream);
168        }
169 
170        /**
171         * The {@link RepeatOperations} to use for the outer loop of the batch
172         * processing. Should be set up by the caller through a factory. Defaults to
173         * a plain {@link RepeatTemplate}.
174         * 
175         * @param stepOperations a {@link RepeatOperations} instance.
176         */
177        public void setStepOperations(RepeatOperations stepOperations) {
178                this.stepOperations = stepOperations;
179        }
180 
181        /**
182         * The {@link RepeatOperations} to use for the inner loop of the batch
183         * processing. should be set up by the caller through a factory. defaults to
184         * a plain {@link RepeatTemplate}.
185         * 
186         * @param chunkOperations a {@link RepeatOperations} instance.
187         */
188        public void setChunkOperations(RepeatOperations chunkOperations) {
189                this.chunkOperations = chunkOperations;
190        }
191 
192        /**
193         * Setter for the {@link StepInterruptionPolicy}. The policy is used to
194         * check whether an external request has been made to interrupt the job
195         * execution.
196         * 
197         * @param interruptionPolicy a {@link StepInterruptionPolicy}
198         */
199        public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) {
200                this.interruptionPolicy = interruptionPolicy;
201        }
202 
203        /**
204         * Mostly useful for testing, but could be used to remove dependence on
205         * backport concurrency utilities. Public setter for the
206         * {@link StepExecutionSynchronizer}.
207         * 
208         * @param synchronizer the {@link StepExecutionSynchronizer} to set
209         */
210        public void setSynchronizer(StepExecutionSynchronizer synchronizer) {
211                this.synchronizer = synchronizer;
212        }
213 
214        /**
215         * Process the step and update its context so that progress can be monitored
216         * by the caller. The step is broken down into chunks, each one executing in
217         * a transaction. The step and its execution and execution context are all
218         * given an up to date {@link BatchStatus}, and the {@link JobRepository} is
219         * used to store the result. Various reporting information are also added to
220         * the current context (the {@link RepeatContext} governing the step
221         * execution, which would normally be available to the caller somehow
222         * through the step's {@link ExecutionContext}.<br/>
223         * 
224         * @throws JobInterruptedException if the step or a chunk is interrupted
225         * @throws RuntimeException if there is an exception during a chunk
226         * execution
227         * 
228         */
229        protected ExitStatus doExecute(final StepExecution stepExecution) throws Exception {
230                stream.update(stepExecution.getExecutionContext());
231                getJobRepository().saveOrUpdateExecutionContext(stepExecution);
232                itemHandler.mark();
233 
234                final ExceptionHolder fatalException = new ExceptionHolder();
235 
236                return stepOperations.iterate(new RepeatCallback() {
237 
238                        public ExitStatus doInIteration(RepeatContext context) throws Exception {
239                                final StepContribution contribution = stepExecution.createStepContribution();
240                                // Before starting a new transaction, check for
241                                // interruption.
242                                if (stepExecution.isTerminateOnly()) {
243                                        context.setTerminateOnly();
244                                }
245                                interruptionPolicy.checkInterrupted(stepExecution);
246 
247                                ExitStatus exitStatus = ExitStatus.CONTINUABLE;
248 
249                                TransactionStatus transaction = transactionManager.getTransaction(transactionAttribute);
250 
251                                boolean locked = false;
252                                // Throwable that contains any exceptions that are caught but
253                                // are propagated up because
254                                // they shouldn't cause a rollback.
255                                Throwable nonRollbackException = null;
256 
257                                try {
258 
259                                        exitStatus = processChunk(stepExecution, contribution);
260 
261                                        contribution.incrementCommitCount();
262 
263                                        // If the step operations are asynchronous then we need
264                                        // to synchronize changes to the step execution (at a
265                                        // minimum).
266                                        try {
267                                                synchronizer.lock(stepExecution);
268                                                locked = true;
269                                        }
270                                        catch (InterruptedException e) {
271                                                stepExecution.setStatus(BatchStatus.STOPPED);
272                                                Thread.currentThread().interrupt();
273                                        }
274 
275                                        // Apply the contribution to the step
276                                        // only if chunk was successful
277                                        stepExecution.apply(contribution);
278 
279                                        // Attempt to flush before the step execution and stream
280                                        // state are updated
281                                        itemHandler.flush();
282 
283                                        stream.update(stepExecution.getExecutionContext());
284 
285                                        try {
286                                                getJobRepository().saveOrUpdateExecutionContext(stepExecution);
287                                        }
288                                        catch (Exception e) {
289                                                fatalException.setException(e);
290                                                stepExecution.setStatus(BatchStatus.UNKNOWN);
291                                                throw new FatalException("Fatal error detected during save of step execution context", e);
292                                        }
293 
294                                        try {
295                                                itemHandler.mark();
296                                                transactionManager.commit(transaction);
297                                        }
298                                        catch (Exception e) {
299                                                fatalException.setException(e);
300                                                stepExecution.setStatus(BatchStatus.UNKNOWN);
301                                                logger.error("Fatal error detected during commit.");
302                                                throw new FatalException("Fatal error detected during commit", e);
303                                        }
304 
305                                        try {
306                                                getJobRepository().saveOrUpdate(stepExecution);
307                                        }
308                                        catch (Exception e) {
309                                                fatalException.setException(e);
310                                                stepExecution.setStatus(BatchStatus.UNKNOWN);
311                                                throw new FatalException("Fatal error detected during update of step execution", e);
312                                        }
313 
314                                }
315                                catch (Error e) {
316                                        try {
317                                                processRollback(stepExecution, contribution, fatalException, transaction);
318                                        }
319                                        catch (Exception rollbackException) {
320                                                logger.error("Rollback failed, original exception that caused the rollback is", e);
321                                                throw rollbackException;
322                                        }
323                                        throw e;
324                                }
325                                catch (Exception e) {
326                                        try {
327                                                processRollback(stepExecution, contribution, fatalException, transaction);
328                                        }
329                                        catch (Exception rollbackException) {
330                                                logger.error("Rollback failed, original exception that caused the rollback is", e);
331                                                throw rollbackException;
332                                        }
333                                        throw e;
334                                }
335                                finally {
336                                        // only release the lock if we acquired it
337                                        if (locked) {
338                                                synchronizer.release(stepExecution);
339                                        }
340                                        locked = false;
341 
342                                        // Even if the TransactionAttributes dictate that an
343                                        // exception shouldn't cause a rollback,
344                                        // we still need to throw the exception so the
345                                        // ExceptionHandler can make a determination about
346                                        // whether the exception should cause job failure.
347                                        if (nonRollbackException != null) {
348                                                if (nonRollbackException instanceof Error) {
349                                                        throw (Error) nonRollbackException;
350                                                }
351                                                else {
352                                                        throw (Exception) nonRollbackException;
353                                                }
354                                        }
355                                }
356 
357                                // Check for interruption after transaction as well, so that
358                                // the interrupted exception is correctly propagated up to
359                                // caller
360                                interruptionPolicy.checkInterrupted(stepExecution);
361 
362                                return exitStatus;
363                        }
364 
365                });
366 
367        }
368 
369        /**
370         * Execute a bunch of identical business logic operations all within a
371         * transaction. The transaction is programmatically started and stopped
372         * outside this method, so subclasses that override do not need to create a
373         * transaction.
374         * @param execution the current {@link StepExecution} which should be
375         * treated as read-only for the purposes of this method.
376         * @param contribution the current {@link StepContribution} which can accept
377         * changes to be aggregated later into the step execution.
378         * 
379         * @return true if there is more data to process.
380         */
381        protected ExitStatus processChunk(final StepExecution execution, final StepContribution contribution) {
382                ExitStatus result = chunkOperations.iterate(new RepeatCallback() {
383                        public ExitStatus doInIteration(final RepeatContext context) throws Exception {
384                                if (execution.isTerminateOnly()) {
385                                        context.setTerminateOnly();
386                                }
387                                // check for interruption before each item as well
388                                interruptionPolicy.checkInterrupted(execution);
389                                ExitStatus exitStatus = ExitStatus.FINISHED;
390 
391                                exitStatus = itemHandler.handle(contribution);
392 
393                                // check for interruption after each item as well
394                                interruptionPolicy.checkInterrupted(execution);
395                                return exitStatus;
396                        }
397                });
398                return result;
399        }
400 
401        /**
402         * @param stepExecution
403         * @param contribution
404         * @param fatalException
405         * @param transaction
406         */
407        private void processRollback(final StepExecution stepExecution, final StepContribution contribution,
408                        final ExceptionHolder fatalException, TransactionStatus transaction) {
409 
410                stepExecution.incrementReadSkipCountBy(contribution.getReadSkipCount());
411                stepExecution.incrementWriteSkipCountBy(contribution.getWriteSkipCount());
412                /*
413                 * Any exception thrown within the transaction should automatically
414                 * cause the transaction to rollback.
415                 */
416                stepExecution.rollback();
417 
418                try {
419                        itemHandler.reset();
420                        itemHandler.clear();
421                        transactionManager.rollback(transaction);
422                }
423                catch (Exception e) {
424                        /*
425                         * If we already failed to commit, it doesn't help to do this again
426                         * - it's better to allow the CommitFailedException to propagate
427                         */
428                        if (!fatalException.hasException()) {
429                                fatalException.setException(e);
430                                throw new FatalException("Failed while processing rollback", e);
431                        }
432                }
433 
434                if (fatalException.hasException()) {
435                        // Try and give the system a chance to update the stepExecution with
436                        // the failure status.
437                        getJobRepository().saveOrUpdate(stepExecution);
438                }
439        }
440 
441        private static class ExceptionHolder {
442 
443                private Exception exception;
444 
445                public boolean hasException() {
446                        return exception != null;
447                }
448 
449                public void setException(Exception exception) {
450                        this.exception = exception;
451                }
452 
453                public Exception getException() {
454                        return this.exception;
455                }
456 
457        }
458 
459        protected void close(ExecutionContext ctx) throws Exception {
460                stream.close(ctx);
461        }
462 
463        protected void open(ExecutionContext ctx) throws Exception {
464                stream.open(ctx);
465        }
466}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov